package com.crazymaker.cloud.disruptor.demo.business.impl;


import com.crazymaker.cloud.disruptor.demo.business.AsyncProducer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class DisruptorProducer implements AsyncProducer {

    //一个translator可以看做一个事件初始化器，publicEvent方法会调用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.setValue(data);
                }
            };
    private final RingBuffer<LongEvent> ringBuffer;

    public DisruptorProducer() {
        this.ringBuffer = disruptor().getRingBuffer();
    }

    public void publishData(Long data) {
        log.info("生产一个数据：" + data + " | ringBuffer.remainingCapacity()= " + ringBuffer.remainingCapacity());

        ringBuffer.publishEvent(TRANSLATOR, data);


    }

    public synchronized void resize() {

    }

    public long remainingCapacity() {

        return ringBuffer.remainingCapacity();
    }


    private Disruptor<LongEvent> disruptor() {

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();

        LongEventFactory eventFactory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, namedThreadFactory,
                ProducerType.MULTI, new BlockingWaitStrategy());

        // 连接 消费者 处理器 ，两个消费者
        LongEventWorkHandler1 handler1 = new LongEventWorkHandler1();
        LongEventWorkHandler2 handler2 = new LongEventWorkHandler2();
        disruptor.handleEventsWith(handler1, handler2);
        //为消费者配置异常处理器
        disruptor.handleExceptionsFor(handler1).with(exceptionHandler);
        disruptor.handleExceptionsFor(handler2).with(exceptionHandler);

        // 开启 分裂者（事件分发）
        disruptor.start();
        return disruptor;
    }


    public void setMeterRegistry(MeterRegistry meterRegistry) {

    }

    ExceptionHandler exceptionHandler = new ExceptionHandler<LongEvent>() {
        @Override
        public void handleEventException(Throwable throwable, long l, LongEvent longEvent) {
            throwable.printStackTrace();
        }

        @Override
        public void handleOnStartException(Throwable throwable) {
            log.info("Exception Start to Handle!");
        }

        @Override
        public void handleOnShutdownException(Throwable throwable) {
            log.info("Exception Handled!");
        }
    };
}

